Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

address some comments for 11792 #11816

Merged
merged 3 commits into from
Dec 23, 2024
Merged

Conversation

binmahone
Copy link
Collaborator

#11792 is merged without @revans2 's approval because of #11792 (comment). A few comments are received afterwards from @abellina @gerashegalov and @revans2 . This PR is to address their comments

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

@binmahone binmahone mentioned this pull request Dec 9, 2024
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

// repartitioning out (considering we're changing seed each time we repartition).
// However for some test cases with really small batch size, this can happen. So
// we're just logging some warnings here.
log.warn("The bucket is still too large after " + recursiveDepth +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use scala string interpolation here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also not clear to me why we end up in this situation in the tests. Can you give me an example?

Copy link
Collaborator Author

@binmahone binmahone Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @abellina , if you replace this warning with a throw, and run DATAGEN_SEED=1732770215 SPARK_HOME=...... integration_tests/run_pyspark_from_build.sh -k 'hash_aggregate_test and spark.rapids.sql.batchSizeBytes' (the seed is same as in #11790 (comment)), then you'll get see failed tests like:

img_v3_02hl_004f86d4-c406-485b-956a-1279e4ad50eg

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use scala string interpolation here?

done

// duplicated rows (the duplication only happens in different batches) to prevent
// repartitioning out (considering we're changing seed each time we repartition).
// However for some test cases with really small batch size, this can happen. So
// we're just logging some warnings here.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the max to a higher number and throw?

Copy link
Collaborator Author

@binmahone binmahone Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid not. Even if we change the max repartition number from 10 to 20, there will still be failed test cases because of this. Actually, given a small enough batch size, we can always create a contrived case where it "can not repartiton out"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking about this a little more and I think we do need to throw an exception, but we might need to fix some other issues first. We should be doing some debugging to understand fully why the tests are outputting this warning/failing.

Our tests are a little crazy because we can set the batch size to be very small, which can expose some bad cases. But if we do the math this should not be that big of a problem. By default we split the input data 16 ways on each pass. We set the batch size bytes to 250 bytes in some extreme tests. That means if we have a limit of 9 times that we can partition the data before we get a warning that means we have (250 * 16 ^ 9). That is 15.625 TiB of data, assuming that there are no hash collisions.

That said we should be able to know how many partitions we need to do up front because we know the size, row count, and batch count for all of the input data. We used 16 on the join because we didn't know how much data would be processed yet.

I think what is more likely happening is that because we have lots of small batches, after a single pass of the aggregate no output batches have been reduced in size. So then we start trying to partition, but one or two rows for the same key might be large enough that we cannot hit that 250 byte batch size Because of that it does not matter how many times we re-partition the data it will never be small enough.

That is a live lock situation I want to avoid. So we either need to throw an exception when we hit this case or we need to just give up trying to make the data smaller and just cross our fingers and hope that we have enough memory to make it work.

But this is just me speculating. Someone needs to debug what is happening with the failing tests and understand how we can fix them.

Copy link
Collaborator Author

@binmahone binmahone Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I can look into it for a further look. But it might take a while and this PR will be pending longer, is that ok to you (considering this is a follow-up work of #11792 )? @revans2

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some debugging of my own and I understand what is happening. I reproduced the issue by running

env TEST_PARALLEL=0 MAX_PARALLEL=20 DATAGEN_SEED=1732770215 ./run_pyspark_from_build.sh -k 'test_hash_multiple_mode_query' -s 2>&1 | tee log.txt

against spark 3.4.0. Not all of the tests failed, but two of them did. I also added in a lot of debugging.

It turns out that the problem is because of how Spark handles NULL in a hash. A NULL is a noop and returns the seed unchanged. This can lead to a high number of hash collisions. So if we are hashing two columns (NaN, NULL) and (NULL, NaN) (in this case), then the two have the same hash, but different values.

So if we have a very small batch size (like in the tests) and NULLs show up in the data we can run into situations like this.

    // Deal with the over sized buckets
    def needRepartitionAgain(bucket: AutoClosableArrayBuffer[SpillableColumnarBatch]) = {
      bucket.map(_.sizeInBytes).sum > targetMergeBatchSize &&
        bucket.size() != 1 &&
        !bucket.forall(_.numRows() == 1) // this is for test
    }

needRepartionAgain has a check to see if all of the batches have a single row in them to avoid issues with the tests, but technically this is flawed because our hash function is flawed. Using the CUDF implementation instead of the Spark one might fix the issue, but there could be other issues hiding in the hash function. As such I am fine with how the code is now, but I would like a follow on issue where we explore the idea of doing the re-partition once, because we know the size of the input data.

Copy link
Collaborator Author

@binmahone binmahone Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Bobby, I think we don't know the size of the input data ? In previous sort based agg implementation, all input batches are fetched before doing sort, but in current repartition based agg implementation, we repartition each input batch before next input batch is fetched. We did so to avoid unnecessary spills.

Copy link
Collaborator

@revans2 revans2 Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true for the first pass. But if a second pass is needed we know that size of the data because we have processed the entire input at that point and made it spillable. We know a lot at that point.

  1. The size of the input data in bytes
  2. The number of rows total and per batch
  3. The number of input batches (although my debugging showed that we are putting a lot of empty batches in here and should probably filter them out)

Along with this we know that all of the rows in a batch have unique keys (because it has passed the merge stage)

From this we can derive that the number of unique keys in the bucket is between aggregateBatches.map(_.numRows).max() and aggregatebatches.map(_numRows).total

That means if we have a good hash implementation, which we do not but can probably fix, then we should be able to split the input so for a second pass so that we have

val minCardinality = aggreateBatches.map(_.numRows).max()
val targetBatchesBySize = aggregateBatches.map(_.size).sum() / targetBatchSize
val numBuckets = min(minCardinality, targetBatchesBySize)

If after re-partitioning it the second time, then we have probably done the best we could possibly have done and we should just try and make the aggregation work. This should make the worst case situation much better because we will only go through the data 3 times at the most instead of the 11 times max that we have today.

  1. to read it in and repartition it once
  2. to read in repartitioned data and repartition it a second time
  3. to read in second order repartitioned data and do the merge aggregation.

The second optimization that I would make (especially if we want to reduce spilling). Is to not re-partition everything in one go.

The order of operations should be something like the following. Note there is a lot of hand waving here.

val buckets = repartition_pass_1
val remaining = mergeSmallEnoughBucketsAndReturnFromIterator(buckets)
remaining.for_each { tooLargeBucket =>
  val buckets = repartition_pass_2(tooLargeBucket)
  mergeBucketsAndRetrunFromIterator(buckets)
}

This way we will have released as much spilled data as we can after the first repartition pass instead of holding on to it just so we can finish repartitioning everything. It should not reduce the maximum memory used, but it should reduce how long we are at that maximum memory used mark.

I'm not sure if this is going to help in practice. A good hash algorithm should spread the data fairly evenly so we should not see much skew/hash collisions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if a second pass is needed we know that size of the data because we have processed the entire input at that point and made it spillable. -- Not sure if you noticed when reviewing, but in my understanding this is no longer true now, please check :

, the input batches are directly repartitioned as we're iterating the input firstPassIter. I made this change in the repartition-agg PR to avoid spilling.

I'm going to merge this PR first. Please feel free to continue the discussion here.

// repartitioning out (considering we're changing seed each time we repartition).
// However for some test cases with really small batch size, this can happen. So
// we're just logging some warnings here.
log.warn("The bucket is still too large after " + recursiveDepth +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also not clear to me why we end up in this situation in the tests. Can you give me an example?

@binmahone binmahone changed the base branch from branch-24.12 to branch-25.02 December 13, 2024 00:10
@binmahone binmahone dismissed gerashegalov’s stale review December 13, 2024 00:10

The base branch was changed.

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the problem now. The issue could happen in production (in theory), but it is so unlikely that it really does not matter. I would like a follow on to fix some things with partitioning, but it is not a big deal and I can file it myself.

@revans2
Copy link
Collaborator

revans2 commented Dec 20, 2024

@abellina could you please take another look? I debugged what was happening with the test and it is really unlikely to be a production problem, but I want to hear your opinion on it.

@abellina
Copy link
Collaborator

I'll take a look @revans2 @binmahone, it might not be today, but I'll try to at least get the gist of it from Bobby.

@binmahone binmahone merged commit f0c35ff into NVIDIA:branch-25.02 Dec 23, 2024
50 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants